package com.lyon.demo.netty.core;

import com.lyon.demo.rpc.api.core.Command;
import com.lyon.demo.rpc.api.core.RespondFuture;
import com.lyon.demo.rpc.api.endpoint.Transport;
import io.netty.channel.Channel;
import lombok.AllArgsConstructor;

import java.util.concurrent.CompletableFuture;

/**
 * @author Lyon
 */
public class NettyTransport implements Transport {

    private final Channel channel;
    private final InFlightRequests inFlightRequests;

    public NettyTransport(Channel channel, InFlightRequests inFlightRequests) {
        this.channel = channel;
        this.inFlightRequests = inFlightRequests;
    }

    @Override
    public CompletableFuture<Command> send(Command request) {
        CompletableFuture<Command> completableFuture = new CompletableFuture<>();
        final long requestId = request.getHeader().getRequestId();
        final RespondFuture respondFuture = new RespondFuture(requestId, completableFuture);
        inFlightRequests.putRequest(respondFuture);
        try {
            if (!channel.isActive()) {
                channel.connect(channel.remoteAddress());
            }
            channel.writeAndFlush(request).addListener(future -> {
                if (!future.isSuccess()) {
                    completableFuture.completeExceptionally(future.cause());
                }
            });
        } catch (Exception e) {
            inFlightRequests.remove(requestId);
        }
        return completableFuture;
    }

}
